package net.bwie.realtime.jtp.dws.log.function;

import net.bwie.realtime.jtp.dws.log.bean.PageViewBean;
import org.apache.flink.api.common.functions.ReduceFunction;

/**
 * 创建ReduceFunction函数实例，对每分钟窗口中数据进行增量计算
 *      ReduceFunction函数要求聚合结果类型，必须与窗口中函数类型一致
 */
public class PageViewReportReduceFunctionOptimize implements ReduceFunction<PageViewBean> {
    @Override
    public PageViewBean reduce(PageViewBean pageViewBean, PageViewBean t1) throws Exception {

        /*
            pageViewBean:表示增加计算中间结果，t1:表示窗口中每条数据
         */
        //1-增量计算：各个指标值相加
        pageViewBean.setPvCount(pageViewBean.getPvCount()+t1.getPvCount());
        pageViewBean.setPvDuringTime(pageViewBean.getPvDuringTime()+t1.getPvDuringTime());
        pageViewBean.setUvCount(pageViewBean.getUvCount()+t1.getUvCount());
        pageViewBean.setSessionCount(pageViewBean.getSessionCount()+t1.getSessionCount());
        //2-返回结果
        return pageViewBean;
    }
}
